-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45484][SQL] Fix the bug that uses incorrect parquet compression codec lz4raw #43310
Conversation
@@ -105,7 +114,7 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar | |||
|
|||
test("Create parquet table with compression") { | |||
Seq(true, false).foreach { isPartitioned => | |||
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4") | |||
val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this fix, if we use LZ4RAW
here, the test case will be failed!
realCompressionCodecs.forall(((x$1: String) => x$1.==(compressionCodec))) was false
ScalaTestFailureLocation: org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite at (ParquetCompressionCodecPrecedenceSuite.scala:101)
org.scalatest.exceptions.TestFailedException: realCompressionCodecs.forall(((x$1: String) => x$1.==(compressionCodec))) was false
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$2(ParquetCompressionCodecPrecedenceSuite.scala:101)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:306)
at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:304)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.withTable(ParquetCompressionCodecPrecedenceSuite.scala:30)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$1(ParquetCompressionCodecPrecedenceSuite.scala:96)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$checkCompressionCodec$1$adapted(ParquetCompressionCodecPrecedenceSuite.scala:94)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1(SQLTestUtils.scala:79)
at org.apache.spark.sql.test.SQLTestUtils.$anonfun$withTempDir$1$adapted(SQLTestUtils.scala:78)
at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:245)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.org$apache$spark$sql$test$SQLTestUtils$$super$withTempDir(ParquetCompressionCodecPrecedenceSuite.scala:30)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir(SQLTestUtils.scala:78)
at org.apache.spark.sql.test.SQLTestUtils.withTempDir$(SQLTestUtils.scala:77)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.withTempDir(ParquetCompressionCodecPrecedenceSuite.scala:30)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.checkCompressionCodec(ParquetCompressionCodecPrecedenceSuite.scala:94)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$10(ParquetCompressionCodecPrecedenceSuite.scala:110)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$10$adapted(ParquetCompressionCodecPrecedenceSuite.scala:109)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$9(ParquetCompressionCodecPrecedenceSuite.scala:109)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$9$adapted(ParquetCompressionCodecPrecedenceSuite.scala:107)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodecPrecedenceSuite.$anonfun$new$8(ParquetCompressionCodecPrecedenceSuite.scala:107)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is only a test utility method, checkCompressionCodec
, failure? Or, is there any user-facing issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. checkCompressionCodec
failed! It's a test utility method and unrelated to user-facing.
cc @wangyum FYI |
@@ -59,7 +59,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite { | |||
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available | |||
// on Maven Central. | |||
override protected def availableCodecs: Seq[String] = { | |||
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4raw") | |||
Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it succeeds currently. Do you know the difference, @beliefer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case is success. But https://github.com/apache/spark/pull/43310/files#r1352405312 failed!
.version("1.1.1") | ||
.stringConf | ||
.transform(_.toLowerCase(Locale.ROOT)) | ||
.checkValues( | ||
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw", "zstd")) | ||
Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", "lz4_raw", "zstd")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, unfortunately, we cannot do like this because Apache Spark 3.5.0 is already released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that. But 3.5.0 released latest, could we fix it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, if we remove this here, the production job with the existing configuration fails with Spark 3.5.1.
@@ -96,7 +96,7 @@ object ParquetOptions extends DataSourceOptions { | |||
"lzo" -> CompressionCodecName.LZO, | |||
"brotli" -> CompressionCodecName.BROTLI, | |||
"lz4" -> CompressionCodecName.LZ4, | |||
"lz4raw" -> CompressionCodecName.LZ4_RAW, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. We cannot delete like this. Only we can add like next line for backward-compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea.
@@ -1014,12 +1014,12 @@ object SQLConf { | |||
"`parquet.compression` is specified in the table-specific options/properties, the " + | |||
"precedence would be `compression`, `parquet.compression`, " + | |||
"`spark.sql.parquet.compression.codec`. Acceptable values include: none, uncompressed, " + | |||
"snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.") | |||
"snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the suffix of the file after changing to LZ4_RAW
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR:
part-00000-fc07d464-03b2-42d6-adc1-68a3adca1752.c000.lz4raw.parquet
After this PR:
part-00000-07244014-f31a-4097-8878-dd3630e721ce.c000.lz4raw.parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is no change at file name layer, it's good.
@dongjoon-hyun @wangyum Because Spark 3.5.0 released. The name |
Yes, I guess we can deprecated it at Spark 3.5.1 with a warning and remove it at Apache Spark 4.0.0. Since it's a fairly new config value, there will be not many users. |
I will create a PR for branch-3.5 to deprecate |
…codec lz4raw ### What changes were proposed in this pull request? According to the discussion at #43310 (comment), this PR want deprecates the incorrect parquet compression codec `lz4raw` at Spark 3.5.1 and adds a warning log. The warning log prompts users that `lz4raw` will be removed it at Apache Spark 4.0.0. ### Why are the changes needed? Deprecated the incorrect parquet compression codec `lz4raw`. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users will see the waring log below. `Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'` ### How was this patch tested? Exists test cases and new test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43330 from beliefer/SPARK-45484_3.5. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Jiaan Geng <[email protected]>
#43330 merged. |
@dongjoon-hyun Could you have time to take a review? |
cc @srowen |
Merged. Thanks @dongjoon-hyun @srowen @HyukjinKwon @wangyum @LuciferYang |
…pression codec ### What changes were proposed in this pull request? This PR follows up #43310 to update the document of parquet compression codec. ### Why are the changes needed? Update the document of parquet compression codec. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43464 from beliefer/SPARK-45484_followup. Authored-by: Jiaan Geng <[email protected]> Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
#41507 supported the new parquet compression codec
lz4raw
. Butlz4raw
is not a correct parquet compression codec name.This mistake causes error. Please refer https://github.com/apache/spark/pull/43310/files#r1352405312
The root cause is parquet uses
lz4_raw
as its name and store it into the metadata of parquet file. Please referspark/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
Line 65 in 6373f19
We should use
lz4_raw
as its name.Why are the changes needed?
Fix the bug that uses incorrect parquet compression codec
lz4raw
.Does this PR introduce any user-facing change?
'Yes'.
Fix a bug.
How was this patch tested?
New test cases.
Was this patch authored or co-authored using generative AI tooling?
'No'.